home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from ZSI import _copyright, _children, _child_elements, _inttypes, _stringtypes, _seqtypes, _find_arraytype, _find_href, _find_type, _find_xmlns_prefix, _get_idstr, EvaluateException, ParseException
- from TC import _get_element_nsuri_name, _get_xsitype, TypeCode, Any, AnyElement, AnyType, Nilled, UNBOUNDED
- from schema import GED, ElementDeclaration, TypeDefinition, _get_substitute_element, _get_type_definition, _is_substitute_element
- from ZSI.wstools.Namespaces import SCHEMA, SOAP
- from ZSI.wstools.Utility import SplitQName
- from ZSI.wstools.logging import getLogger as _GetLogger
- import re
- import types
- from copy import copy as _copy
-
- _find_arrayoffset = lambda E: E.getAttributeNS(SOAP.ENC, 'offset')
-
- _find_arrayposition = lambda E: E.getAttributeNS(SOAP.ENC, 'position')
- _offset_pat = re.compile('\\[[0-9]+\\]')
- _position_pat = _offset_pat
-
- def _check_typecode_list(ofwhat, tcname):
- for o in ofwhat:
- if callable(o):
- continue
-
- if not isinstance(o, TypeCode):
- raise TypeError(tcname + ' ofwhat outside the TypeCode hierarchy, ' + str(o.__class__))
-
- if o.pname is None and not isinstance(o, AnyElement):
- raise TypeError(tcname + ' element ' + str(o) + ' has no name')
- continue
-
-
-
- def _get_type_or_substitute(typecode, pyobj, sw, elt):
- sub = getattr(pyobj, 'typecode', typecode)
- if sub is typecode or sub is None:
- return typecode
-
- if isinstance(typecode, AnyElement):
- return sub
-
- if isinstance(sub, ElementDeclaration):
- if (typecode.nspname, typecode.pname) == (sub.nspname, sub.pname):
- raise TypeError('bad usage, failed to serialize element reference (%s, %s), in: %s' % (typecode.nspname, typecode.pname, sw.Backtrace(elt)))
-
- if _is_substitute_element(typecode, sub):
- return sub
-
- raise TypeError('failed to serialize (%s, %s) illegal sub GED (%s,%s): %s' % (typecode.nspname, typecode.pname, sub.nspname, sub.pname, sw.Backtrace(elt)))
-
- if not isinstance(typecode, AnyType) and not isinstance(sub, typecode.__class__):
- raise TypeError('failed to serialize substitute %s for %s, not derivation: %s' % (sub, typecode, sw.Backtrace(elt)))
-
- sub = _copy(sub)
- sub.nspname = typecode.nspname
- sub.pname = typecode.pname
- sub.aname = typecode.aname
- sub.minOccurs = sub.maxOccurs = 1
- return sub
-
-
- class ComplexType(TypeCode):
- logger = _GetLogger('ZSI.TCcompound.ComplexType')
-
- def __init__(self, pyclass, ofwhat, pname = None, inorder = False, inline = False, mutable = True, mixed = False, mixed_aname = '_text', **kw):
- TypeCode.__init__(self, pname, pyclass = pyclass, **kw)
- self.inorder = inorder
- self.inline = inline
- self.mutable = mutable
- self.mixed = mixed
- self.mixed_aname = None
- if mixed is True:
- self.mixed_aname = mixed_aname
-
- if self.mutable is True:
- self.inline = True
-
- if not kw.get('type'):
- pass
- self.type = _get_xsitype(self)
- t = type(ofwhat)
- if t not in _seqtypes:
- raise TypeError('Struct ofwhat must be list or sequence, not ' + str(t))
-
- self.ofwhat = tuple(ofwhat)
- if TypeCode.typechecks:
- if self.pyclass is not None and type(self.pyclass) is not types.ClassType and not isinstance(self.pyclass, object):
- raise TypeError('pyclass must be None or an old-style/new-style class, not ' + str(type(self.pyclass)))
-
- _check_typecode_list(self.ofwhat, 'ComplexType')
-
-
-
- def parse(self, elt, ps):
- debug = self.logger.debugOn()
- if debug:
- pass
- self.logger.debug('parse')
- xtype = self.checkname(elt, ps)
- if self.type and xtype not in [
- self.type,
- (None, None)]:
- if not isinstance(self, TypeDefinition):
- raise EvaluateException('ComplexType for %s has wrong type(%s), looking for %s' % (self.pname, self.checktype(elt, ps), self.type), ps.Backtrace(elt))
- elif debug:
- pass
- self.logger.debug('delegate to substitute type')
- what = TypeDefinition.getSubstituteType(self, elt, ps)
- return what.parse(elt, ps)
-
- href = _find_href(elt)
- if href:
- if _children(elt):
- raise EvaluateException('Struct has content and HREF', ps.Backtrace(elt))
-
- elt = ps.FindLocalHREF(href, elt)
-
- c = _child_elements(elt)
- count = len(c)
- if self.nilled(elt, ps):
- return Nilled
-
- v = { }
- attributes = self.parse_attributes(elt, ps)
- if attributes:
- v[self.attrs_aname] = attributes
-
- if self.mixed is True:
- v[self.mixed_aname] = self.simple_value(elt, ps, mixed = True)
-
- c = c[:]
- crange = range(len(c))
- if debug:
- self.logger.debug('ofwhat: %s', str(self.ofwhat))
-
- any = None
- for i in range(len(self.ofwhat)):
- pass
-
- if any is not None:
- occurs = 0
- v[any.aname] = []
- for j in crange:
- if c[j]:
- continue
- _[3][(j, c[j])]
-
- if any.maxOccurs == 1 and occurs == 0:
- v[any.aname] = None
- elif (occurs < any.minOccurs or any.maxOccurs != UNBOUNDED) and any.maxOccurs < occurs:
- raise EvaluateException('occurances of <any> elements(#%d) bound by (%d,%s)' % (occurs, any.minOccurs, str(any.maxOccurs)), ps.Backtrace(elt))
-
-
- if not self.pyclass:
- return v
-
-
- try:
- pyobj = self.pyclass()
- except Exception:
- e = None
- raise TypeError('Constructing element (%s,%s) with pyclass(%s), %s' % (self.nspname, self.pname, self.pyclass.__name__, str(e)))
-
- for key in v.keys():
- setattr(pyobj, key, v[key])
-
- return pyobj
-
-
- def serialize(self, elt, sw, pyobj, inline = False, name = None, **kw):
- if inline or self.inline:
- self.cb(elt, sw, pyobj, name = name, **kw)
- else:
- objid = _get_idstr(pyobj)
- (ns, n) = self.get_name(name, objid)
- el = elt.createAppendElement(ns, n)
- el.setAttributeNS(None, 'href', '#%s' % objid)
- sw.AddCallback(self.cb, elt, sw, pyobj)
-
-
- def cb(self, elt, sw, pyobj, name = None, **kw):
- debug = self.logger.debugOn()
- if debug:
- self.logger.debug('cb: %s' % str(self.ofwhat))
-
- objid = _get_idstr(pyobj)
- (ns, n) = self.get_name(name, objid)
- if pyobj is None:
- if self.nillable is True:
- elem = elt.createAppendElement(ns, n)
- self.serialize_as_nil(elem)
- return None
-
- raise EvaluateException, 'element(%s,%s) is not nillable(%s)' % (self.nspname, self.pname, self.nillable)
-
- if self.mutable is False and sw.Known(pyobj):
- return None
-
- if debug:
- self.logger.debug('element: (%s, %s)', str(ns), n)
-
- if n is not None:
- elem = elt.createAppendElement(ns, n)
- self.set_attributes(elem, pyobj)
- if kw.get('typed', self.typed) is True:
- self.set_attribute_xsi_type(elem)
-
- if self.mixed is True and self.mixed_aname is not None:
- if hasattr(pyobj, self.mixed_aname):
- textContent = getattr(pyobj, self.mixed_aname)
- if hasattr(textContent, 'typecode'):
- textContent.typecode.serialize_text_node(elem, sw, textContent)
- elif type(textContent) in _stringtypes:
- if debug:
- self.logger.debug('mixed text content:\n\t%s', textContent)
-
- elem.createAppendTextNode(textContent)
- else:
- raise EvaluateException('mixed test content in element (%s,%s) must be a string type' % (self.nspname, self.pname), sw.Backtrace(elt))
- elif debug:
- self.logger.debug('mixed NO text content in %s', self.mixed_aname)
-
-
- else:
- elem = elt
- if self.inline:
- pass
- elif not (self.inline) and self.unique:
- raise EvaluateException('Not inline, but unique makes no sense. No href/id.', sw.Backtrace(elt))
- elif n is not None:
- self.set_attribute_id(elem, objid)
-
- if self.pyclass and type(self.pyclass) is type:
-
- f = lambda attr: getattr(pyobj, attr, None)
- elif self.pyclass:
- d = pyobj.__dict__
-
- f = lambda attr: d.get(attr)
- else:
- d = pyobj
-
- f = lambda attr: pyobj.get(attr)
- if TypeCode.typechecks and type(d) != types.DictType:
- raise TypeError("Classless complexType didn't get dictionary")
-
- indx = 0
- lenofwhat = len(self.ofwhat)
- if debug:
- self.logger.debug('element declaration (%s,%s)', self.nspname, self.pname)
- if self.type:
- self.logger.debug('xsi:type definition (%s,%s)', self.type[0], self.type[1])
- else:
- self.logger.warning('NO xsi:type')
-
- while indx < lenofwhat:
- occurs = 0
- what = self.ofwhat[indx]
- if callable(what):
- what = what()
-
- if debug:
- self.logger.debug('serialize what -- %s', what.__class__.__name__)
-
- aname = what.aname
- v = f(aname)
- indx += 1
- if what.minOccurs == 0 and v is None:
- continue
-
- whatTC = what
- if whatTC.maxOccurs > 1 and v is not None:
- if type(v) not in _seqtypes:
- raise EvaluateException('pyobj (%s,%s), aname "%s": maxOccurs %s, expecting a %s' % (self.nspname, self.pname, what.aname, whatTC.maxOccurs, _seqtypes), sw.Backtrace(elt))
-
- for v2 in v:
- occurs += 1
- if occurs > whatTC.maxOccurs:
- raise EvaluateException('occurances (%d) exceeded maxOccurs(%d) for <%s>' % (occurs, whatTC.maxOccurs, what.pname), sw.Backtrace(elt))
-
- what = _get_type_or_substitute(whatTC, v2, sw, elt)
- if debug and what is not whatTC:
- self.logger.debug('substitute derived type: %s' % what.__class__)
-
- what.serialize(elem, sw, v2, **kw)
-
- if occurs < whatTC.minOccurs:
- raise EvaluateException('occurances(%d) less than minOccurs(%d) for <%s>' % (occurs, whatTC.minOccurs, what.pname), sw.Backtrace(elt))
- continue
- continue
-
- if v is not None or what.nillable is True:
- what = _get_type_or_substitute(whatTC, v, sw, elt)
- if debug and what is not whatTC:
- self.logger.debug('substitute derived type: %s' % what.__class__)
-
- what.serialize(elem, sw, v, **kw)
- continue
-
- raise EvaluateException('Got None for nillable(%s), minOccurs(%d) element (%s,%s), %s' % (what.nillable, what.minOccurs, what.nspname, what.pname, elem), sw.Backtrace(elt))
-
-
- def setDerivedTypeContents(self, extensions = None, restrictions = None):
- if extensions:
- ofwhat = list(self.ofwhat)
- if type(extensions) in _seqtypes:
- ofwhat += list(extensions)
- else:
- ofwhat.append(extensions)
- elif restrictions:
- if type(restrictions) in _seqtypes:
- ofwhat = restrictions
- else:
- ofwhat = (restrictions,)
- else:
- return None
- self.ofwhat = tuple(ofwhat)
- self.lenofwhat = len(self.ofwhat)
-
-
-
- class Struct(ComplexType):
- logger = _GetLogger('ZSI.TCcompound.Struct')
-
- def __init__(self, pyclass, ofwhat, pname = None, inorder = False, inline = False, mutable = True, **kw):
- ComplexType.__init__(self, pyclass, ofwhat, pname = pname, inorder = inorder, inline = inline, mutable = mutable, **kw)
- whats = map((lambda what: (what.nspname, what.pname)), self.ofwhat)
- for idx in range(len(self.ofwhat)):
- what = self.ofwhat[idx]
- key = (what.nspname, what.pname)
- if not isinstance(what, AnyElement) and what.maxOccurs > 1:
- raise TypeError, 'Constraint: no element can have a maxOccurs>1'
-
- if key in whats[idx + 1:]:
- raise TypeError, 'Constraint: No element may have the same name as any other'
- continue
-
-
-
-
- class Array(TypeCode):
- logger = _GetLogger('ZSI.TCcompound.Array')
-
- def __init__(self, atype, ofwhat, pname = None, dimensions = 1, fill = None, sparse = False, mutable = False, size = None, nooffset = 0, undeclared = False, childnames = None, **kw):
- TypeCode.__init__(self, pname, **kw)
- self.dimensions = dimensions
- self.atype = atype
- if undeclared is False and self.atype[1].endswith(']') is False:
- self.atype = (self.atype[0], '%s[]' % self.atype[1])
-
- if self.dimensions != 1:
- raise TypeError('Only single-dimensioned arrays supported')
-
- self.fill = fill
- self.sparse = sparse
- self.mutable = mutable
- self.size = size
- self.nooffset = nooffset
- self.undeclared = undeclared
- self.childnames = childnames
- if self.size:
- t = type(self.size)
- if t in _inttypes:
- self.size = (self.size,)
- elif t in _seqtypes:
- self.size = tuple(self.size)
- elif TypeCode.typechecks:
- raise TypeError('Size must be integer or list, not ' + str(t))
-
-
- if not ofwhat:
- pass
- ofwhat = Any()
- if TypeCode.typechecks:
- if self.undeclared is False and type(atype) not in _seqtypes and len(atype) == 2:
- raise TypeError('Array type must be a sequence of len 2.')
-
- t = type(ofwhat)
- if not isinstance(ofwhat, TypeCode):
- raise TypeError('Array ofwhat outside the TypeCode hierarchy, ' + str(ofwhat.__class__))
-
- if self.size:
- if len(self.size) != self.dimensions:
- raise TypeError('Array dimension/size mismatch')
-
- for s in self.size:
- if type(s) not in _inttypes:
- raise TypeError('Array size "' + str(s) + '" is not an integer.')
- continue
-
-
-
- self.ofwhat = ofwhat
-
-
- def parse_offset(self, elt, ps):
- o = _find_arrayoffset(elt)
- if not o:
- return 0
-
- if not _offset_pat.match(o):
- raise EvaluateException('Bad offset "' + o + '"', ps.Backtrace(elt))
-
- return int(o[1:-1])
-
-
- def parse_position(self, elt, ps):
- o = _find_arrayposition(elt)
- if not o:
- return None
-
- if o.find(',') > -1:
- raise EvaluateException('Sorry, no multi-dimensional arrays', ps.Backtrace(elt))
-
- if not _position_pat.match(o):
- raise EvaluateException('Bad array position "' + o + '"', ps.Backtrace(elt))
-
- return int(o[1:-1])
-
-
- def parse(self, elt, ps):
- href = _find_href(elt)
- if href:
- if _children(elt):
- raise EvaluateException('Array has content and HREF', ps.Backtrace(elt))
-
- elt = ps.FindLocalHREF(href, elt)
-
- if self.nilled(elt, ps):
- return Nilled
-
- if not _find_arraytype(elt) and self.undeclared is False:
- raise EvaluateException('Array expected', ps.Backtrace(elt))
-
- t = _find_type(elt)
- if t:
- pass
-
- offset = self.parse_offset(elt, ps)
- v = []
- vlen = 0
- if offset and not (self.sparse):
- while vlen < offset:
- vlen += 1
- v.append(self.fill)
-
- for c in _child_elements(elt):
- item = self.ofwhat.parse(c, ps)
- if not self.parse_position(c, ps):
- pass
- position = offset
- if self.sparse:
- v.append((position, item))
- else:
- while offset < position:
- offset += 1
- v.append(self.fill)
- v.append(item)
- offset += 1
-
- return v
-
-
- def serialize(self, elt, sw, pyobj, name = None, childnames = None, **kw):
- debug = self.logger.debugOn()
- if debug:
- self.logger.debug('serialize: %r' % pyobj)
-
- if self.mutable is False and sw.Known(pyobj):
- return None
-
- objid = _get_idstr(pyobj)
- (ns, n) = self.get_name(name, objid)
- el = elt.createAppendElement(ns, n)
- if self.nillable is True and pyobj is None:
- self.serialize_as_nil(el)
- return None
-
- self.set_attributes(el, pyobj)
- if not self.unique:
- pass
- unique = kw.get('unique', False)
- if unique is False and sw.Known(pyobj):
- self.set_attribute_href(el, objid)
- return None
-
- if kw.get('typed', self.typed) is True:
- self.set_attribute_xsi_type(el, **kw)
-
- if self.unique is False:
- self.set_attribute_id(el, objid)
-
- offset = 0
- if self.sparse is False and self.nooffset is False:
- offset = 0
- end = len(pyobj)
- while offset < end and pyobj[offset] == self.fill:
- offset += 1
- if offset:
- el.setAttributeNS(SOAP.ENC, 'offset', '[%d]' % offset)
-
-
- if self.undeclared is False:
- el.setAttributeNS(SOAP.ENC, 'arrayType', '%s:%s' % (el.getPrefix(self.atype[0]), self.atype[1]))
-
- if debug:
- self.logger.debug('ofwhat: %r' % self.ofwhat)
-
- d = { }
- if not childnames:
- pass
- kn = self.childnames
- if kn:
- d['name'] = kn
- elif not self.ofwhat.aname:
- d['name'] = 'element'
-
- if self.sparse is False:
- for e in pyobj[offset:]:
- self.ofwhat.serialize(el, sw, e, **d)
-
- else:
- position = 0
- for pos, v in pyobj:
- if pos != position:
- el.setAttributeNS(SOAP.ENC, 'position', '[%d]' % pos)
- position = pos
-
- self.ofwhat.serialize(el, sw, v, **d)
- position += 1
-
-
-
- if __name__ == '__main__':
- print _copyright
-
-